Skip to content

Fix orphaned subprocesses and supervisor crash on heartbeat 409#65738

Open
cmettler wants to merge 1 commit into
apache:mainfrom
cmettler:fix_65505
Open

Fix orphaned subprocesses and supervisor crash on heartbeat 409#65738
cmettler wants to merge 1 commit into
apache:mainfrom
cmettler:fix_65505

Conversation

@cmettler

@cmettler cmettler commented Apr 23, 2026

Copy link
Copy Markdown

When a running TaskInstance is forcibly transitioned out of running (scheduler reset, REST PATCH, etc.), the next heartbeat from the still-running task-runner returns HTTP 409 and the supervisor kills the task. On Linux this produced two bugs:

  1. Orphaned subprocesses — any Popen child the task-runner had spawned (@task.virtualenv, DockerOperator, BashOperator, Cosmos dbt, etc.) was reparented to PID 1 and kept running until it finished on its own, wasting CPU/RAM/API quota.
  2. Supervisor crash — ~60s later _cleanup_open_sockets() closed the selector while _service_subprocess() was still polling it, raising ValueError: I/O operation on closed epoll object (regression from Fix lingering task supervisors when EOF is missed #51180).

Fix

Place the task-runner in its own session via os.setsid() immediately after fork, then have kill() signal the whole process group via os.killpg(os.getpgid(pid), sig). This reaches every subprocess the task-runner spawned. Grandchildren without a SIGTERM handler exit promptly and close their inherited pipes, so the supervisor drains _open_sockets normally and never enters the cleanup-the-selector-mid-loop path.

killpg/getpgid fall back to self._process.send_signal(sig) on ProcessLookupError or PermissionError, preserving behaviour when the group has vanished or permissions are lacking.

Tests

  • test_kill_signals_process_group — primary path uses killpg.
  • test_kill_falls_back_to_send_signal_when_group_signal_fails (4 params: {ProcessLookupError, PermissionError} × {getpgid, killpg}).
  • test_child_is_session_leader — real-fork regression: asserts child's PGID == child's PID after ActivitySubprocess.start().
  • Two existing kill tests updated to mock os.getpgid / os.killpg explicitly.
  • All 136 supervisor tests pass (uv run --project task-sdk pytest task-sdk/tests/task_sdk/execution_time/test_supervisor.py).

closes: #65505


Was generative AI tooling used to co-author this PR?
  • Yes (Claude Opus 4.7 (1M context))

Generated-by: Claude Opus 4.7 (1M context) following the guidelines


Important

🛠️ Maintainer triage note for @cmettler · by @potiuk · 2026-06-22 06:31 UTC

Helpful heads-up from the maintainers — please address before this PR can be reviewed (see the Pull Request quality criteria):

  • Tests — failing: provider distributions tests / Compat 2.11.1:P3.10:, provider distributions tests / Compat 3.0.6:P3.10:, provider distributions tests / Compat 3.1.8:P3.10:.

The ball is in your court — you've been assigned to this PR. Fix the above, then mark it Ready for review.

Automated triage — may be imperfect; a maintainer takes the next look.

@boring-cyborg

boring-cyborg Bot commented Apr 23, 2026

Copy link
Copy Markdown

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Apr 27, 2026
@potiuk potiuk marked this pull request as draft June 22, 2026 06:34
When a running TaskInstance is forcibly transitioned out of `running`
(e.g. the scheduler resets a stale heartbeat, or an operator PATCHes the
state to `failed`), the task-runner's next heartbeat returns HTTP 409
and the supervisor kills the task. Before this change two things went
wrong on Linux:

1. Subprocesses the task-runner had spawned (`@task.virtualenv` /
   `PythonVirtualenvOperator` children, `DockerOperator` exec, Bash
   shells) were reparented to PID 1 and kept running as orphans until
   they finished on their own - wasting CPU, RAM and third-party API
   quota.
2. About 60s later, `_cleanup_open_sockets()` closed the selector while
   `_service_subprocess()` was still using it, so the supervisor
   crashed with `ValueError: I/O operation on closed epoll object`
   (regression from PR apache#51180).

The task-runner is now placed in its own session via `os.setsid()`
immediately after fork, so its process group ID equals its PID. The
supervisor's `kill()` signals the whole group via
`os.killpg(os.getpgid(pid), sig)`, which reaches every subprocess the
task-runner spawned. Grandchildren without a SIGTERM handler exit
promptly, close their inherited pipes, and the supervisor drains
`_open_sockets` normally - so `_cleanup_open_sockets()` is never
triggered and the selector is never closed mid-loop.

`os.killpg`/`os.getpgid` fall back to `self._process.send_signal(sig)`
on `ProcessLookupError` or `PermissionError`, preserving prior
behaviour when the group has vanished (e.g. the task was already
reaped) or permissions are lacking.

closes: apache#65505
@cmettler

Copy link
Copy Markdown
Author

CI failures were unrelated — caused by azure-storage-blob 12.30.0 breaking the WASB SAS-token tests (issue #68482), fixed upstream in #68490. After rebasing onto current main, that fix is now in our sources and CI should pass. PR is ready for review when you have time.


Drafted-by: Claude Code (Opus 4.7); reviewed by @cmettler before posting

@cmettler cmettler marked this pull request as ready for review June 24, 2026 05:52
# have been placed in its own session via os.setsid() at fork
# time (see start()). See issue #65505.
try:
os.killpg(os.getpgid(self._process.pid), sig)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.killpg(os.getpgid(self._process.pid), sig) trusts that the child has already run os.setsid(). Two paths break that assumption and make this signal the supervisor's own process group:

  1. setsid() is wrapped in with suppress(OSError) in start(), so if it ever fails the child stays in the supervisor's group.
  2. _on_child_started calls self.kill(signal.SIGKILL) on any exception from task_instances.start() (line 1385). setsid() runs in the forked child and the parent doesn't synchronize on it, so a synchronous failure there can reach kill() before the child has run setsid(). (A 409 over the network is fine, since the round-trip gives the child time to run it; a local/synchronous failure isn't.)

In both cases os.getpgid(child) returns the supervisor's PGID and os.killpg(..., SIGKILL) hits the supervisor and every sibling in its group. The except (ProcessLookupError, PermissionError) fallback doesn't catch this because nothing is raised. test_child_is_session_leader asserts this exact invariant ("so os.killpg() does not signal the supervisor itself"), but the production path has no guard.

Either set the group race-free from the parent too (with suppress(OSError): os.setpgid(pid, pid) right after the fork) or guard the kill site:

pgid = os.getpgid(self._process.pid)
if pgid == os.getpgid(0):
    self._process.send_signal(sig)
else:
    os.killpg(pgid, sig)

Separately: this group-signal only runs on the kill() path. The graceful path in wait() (_forward_signal -> os.kill(self.pid, signum)) still signals the task-runner alone, so the orphan leak this PR targets persists on graceful SIGTERM (e.g. K8s pod termination). Now that the child is a session leader, that path could use killpg too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more: airflow.utils.process_utils.reap_process_group() already implements this whole teardown, and it has the exact guard that's missing here:

if not IS_WINDOWS and process_group_id == os.getpgid(0):
    raise RuntimeError("I refuse to kill myself")

It also covers what this loop doesn't: SIGTERM -> wait -> SIGKILL escalation via psutil.wait_procs, EPERM -> sudo -n kill for the run_as_user case, and ESRCH (the "child hasn't changed its group yet" race) by falling back to signalling the PID directly.

It lives in airflow-core, and the supervisor keeps its airflow.* imports lazy for worker isolation, so it's not a drop-in import. But rather than hand-rolling a second, less complete version here, should we port/copy reap_process_group (+ its self-group guard) into task-sdk and use that instead? One tested teardown path beats two that can drift.

# only hits the task-runner and any Popen children are reparented
# to PID 1 and leak as orphans. See issue #65505.
with suppress(OSError):
os.setsid()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setsid() (and the killpg in kill()) lands in the base WatchedSubprocess, and kill() is defined only here with no subclass override, so it applies to every subprocess type (DagFileProcessorProcess, TriggerRunnerSupervisor, CallbackSubprocess), not just the ActivitySubprocess task-runner this PR describes.

The triggerer's long-lived async subprocess installs its own SIGINT/SIGTERM handlers and expects a graceful shutdown; making it a session leader and group-signalling it changes that. Detaching these children into their own session also means a terminal Ctrl-C (foreground-group SIGINT) no longer reaches them directly. The neighbouring use_exec handles exactly this by being opt-in per subclass (its docstring even notes the DAG processor and triggerer as a follow-up).

Suggest the same here: a new_session: bool = False param on start() set True only in ActivitySubprocess, with the killpg branch in kill() gated on it. That keeps the change scoped to the task-runner, and confines the self-signal risk flagged on the killpg line to the one path you've actually tested.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're here: this setsid() satisfies the # TODO: Make this process a session leader at line 420 (top of _fork_main). That TODO is now stale and can be dropped.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the scope point: the existing set_new_process_group() (airflow.utils.process_utils) does this with os.setpgid(0, 0) rather than os.setsid(). That still gives a killable process group (so killpg reaches the whole tree) but without creating a new session / detaching the controlling terminal, so it avoids the Ctrl-C / foreground-group change noted above. Its companion reap_process_group() already has the SIGTERM -> SIGKILL escalation and EPERM/ESRCH handling.

Same question as on the kill() thread: should we port/copy these over into task-sdk and use them here instead of a fresh setsid/killpg path? Reusing the existing setpgid group + self-group guard would address both the over-broad scope and the self-signal risk in one go.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just thinking out loud -- we can surely do it in separate PR too but wanted to raise so I don't forget either :) )

@seanmuth

seanmuth commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Following up on @ashb’s internal note about the session-leader TODO:

Confirmed this PR does make the task-runner a session leader — os.setsid() is added right after the fork in start(), and kill() now signals the whole group via os.killpg(os.getpgid(self._process.pid), sig) with a send_signal() fallback (plus a test asserting the child’s PGID == its own PID post-fork).

That makes the pre-existing # TODO: Make this process a session leader in _fork_main obsolete, but the PR doesn’t currently touch it:

# TODO: Make this process a session leader

Could you drop that TODO comment as part of this change? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task-runner's venv / Popen subprocesses become orphans on heartbeat 409; supervisor also crashes with ValueError: I/O operation on closed epoll object

4 participants